home *** CD-ROM | disk | FTP | other *** search
/ Pascal Super Library / Pascal Super Library (CW International)(1997).bin / DELPHI32 / SYS_TOOL / MULTI020 / MPIPES.PAS < prev    next >
Pascal/Delphi Source File  |  1993-09-08  |  8KB  |  272 lines

  1. unit MPipes;
  2. interface
  3. uses Multi;
  4.  
  5. { Multitasking pipes using #Semaphore#s. }
  6.  
  7. type
  8.   ca = array[0..65533] of char;
  9.     { array of bytes for accessing single chars in the pipe buffer }
  10.  
  11.   pPipe = ^tPipe;
  12.   tPipe = object
  13.     { A pipe is an object one task only writes to and one task only reads from.
  14.       If the writing task wants to write more than fits into the buffer, or the
  15.       reading task wants to read more than is available, it is put asleep using
  16.       a semaphore until the request can be performed. }
  17.     buf : ^ca;
  18.       { This is the pipe buffer }
  19.     bsize : word;
  20.       { The size of the buffer }
  21.     head,
  22.       { The head pointer in the pipe ring buffer }
  23.     tail : word;
  24.       { The tail pointer in the pipe ring buffer }
  25.     ba : word;
  26.       { The number of bytes in the pipe buffer }
  27.  
  28.     readsema : Semaphore;
  29.       { Tasks reading more than is in the pipe are #WaitFor#ed using
  30.         this semaphore }
  31.     writesema : Semaphore;
  32.       { Tasks writing more than is space in the pipe are #WaitFor#ed
  33.         using this semaphore }
  34.     InTasks, OutTasks : word;
  35.       { Counts the tasks that in- and output to this pipe }
  36.     HasRead, HasWritten : boolean;
  37.       { Tracks activities to pipe }
  38.  
  39.     constructor Init(size : word);
  40.       { Initialize a semaphore with 'size' bytes buffer space.
  41.         The pipe will work even with buffer size 1, but it will be a little
  42.         slower than necessary due to the task management overhead. }
  43.     destructor Done;
  44.       { #Kamikaze# all waiting tasks of #readsema# and #writesema#
  45.         and free the buffer }
  46.     function Put(ch : char) : boolean;
  47.       { Put ch into the pipe, #WaitFor#ing the current task if the buffer
  48.         is full. If #tTask.HasExit# is TRUE and #tTask.Poisoned# is TRUE,
  49.         Put returns TRUE, the task has to de-init and terminate. }
  50.     function PutBin(const m; count : word) : boolean;
  51.       { Put the first 'count' bytes from 'm' into the pipe, #WaitFor#ing
  52.         the current task if the buffer is full.
  53.         If #tTask.HasExit# is TRUE and #tTask.Poisoned# is TRUE,
  54.         PutBin returns TRUE, the task has to de-init and terminate. }
  55.     function PutS(const s : string) : boolean;
  56.       { Put s into the pipe as-is, can be read by GetS, #WaitFor#ing
  57.         the current task if the buffer is full.
  58.         If #tTask.HasExit# is TRUE and #tTask.Poisoned# is TRUE,
  59.         PutS returns TRUE, the task has to de-init and terminate. }
  60.     function WriteLn(const s : string) : boolean;
  61.       { Put s into the pipe like WriteLn puts it on the screen,
  62.         (i.e. followed by #13#10), #WaitFor#ing the current task
  63.         if the buffer is full.
  64.         If #tTask.HasExit# is TRUE and #tTask.Poisoned# is TRUE,
  65.         PutLn returns TRUE, the task has to de-init and terminate. }
  66.     function WaitUntilEmpty : boolean;
  67.       { Wait until the reading task has emptied the pipe buffer.
  68.         If #tTask.HasExit# is TRUE and #tTask.Poisoned# is TRUE,
  69.         WaitUntilEmpty returns TRUE, the task has to de-init and terminate. }
  70.     function Peek(var ch : char) : boolean;
  71.       { If there is at least one character in the pipe, read it into ch
  72.         and return TRUE, otherwise return FALSE. }
  73.     function Get : char;
  74.       { Read one char from the pipe, #WaitFor#ing the current task if the
  75.         buffer is empty.
  76.         Note:
  77.           If #tTask.HasExit# is TRUE and #tTask.Poisoned# is TRUE,
  78.           Get returns garbage, but it returns. So check #tTask.Poisoned# ! }
  79.     function GetBin(var m; count : word) : boolean;
  80.       { Read 'count' bytes from the pipe and write them to 'm'.
  81.         #WaitFor# the current task if the buffer is empty.
  82.         If GetBin returns TRUE, deinitialize and terminate.
  83.         Note: GetBin can only return TRUE if #tTask.HasExit# is TRUE. }
  84.     function GetS(var s : string) : boolean;
  85.       { Read s from the pipe as-is, written by PutS, #WaitFor#ing
  86.         the current task if the buffer is empty.
  87.         If #tTask.HasExit# is TRUE and #tTask.Poisoned# is TRUE,
  88.         GetS returns TRUE, the task has to de-init and terminate. }
  89.     function ReadLn(var s : string) : boolean;
  90.       { Read s from the pipe, waiting for terminating #13#10, #WaitFor#ing
  91.         the current task if the buffer is full.
  92.         If #tTask.HasExit# is TRUE and #tTask.Poisoned# is TRUE,
  93.         PutLn returns TRUE, the task has to de-init and terminate. }
  94.  
  95.     procedure NewInputTask;
  96.       { Tell the pipe that there is a new input task }
  97.     procedure NewOutputTask;
  98.       { Tell the pipe that there is a new output task }
  99.     procedure NoMoreInput;
  100.       { The inputting task may call this if it shuts down. The outputting
  101.         task is then terminated. }
  102.     procedure NoMoreOutput;
  103.       { The outputting task may call this if it shuts down. The inputting
  104.         task is then terminated. }
  105.   end;
  106.  
  107. implementation
  108.  
  109. constructor tPipe.Init(size : word);
  110. begin
  111.   HasRead := false;
  112.   HasWritten := false;
  113.   bsize := size; ba := 0; getmem(buf,size);
  114.   head := 0; tail := 0;
  115.   InitSemaphore(readsema); InitSemaphore(writesema);
  116.   intasks := 0; outtasks := 0
  117. end;
  118.  
  119. destructor tPipe.Done;
  120. begin
  121.   Kamikaze(readsema);
  122.   Kamikaze(writesema);
  123.   repeat if Switch then break until (intasks or outtasks) = 0;
  124.   freemem(buf,bsize)
  125. end;
  126.  
  127. procedure tPipe.NoMoreInput;
  128. begin
  129.   dec(intasks);
  130.   if (intasks = 0) and (outtasks > 0) then Done
  131. end;
  132.  
  133. procedure tPipe.NoMoreOutput;
  134. begin
  135.   dec(outtasks);
  136.   if (outtasks = 0) and (intasks > 0) then Done
  137. end;
  138.  
  139. procedure tPipe.NewInputTask;
  140. begin
  141.   haswritten := true;
  142.   inc(intasks)
  143. end;
  144.  
  145. procedure tPipe.NewOutputTask;
  146. begin
  147.   hasread := true;
  148.   inc(outtasks)
  149. end;
  150.  
  151. function tPipe.Put(ch : char) : boolean;
  152. var w : word;
  153. begin
  154.   Put := false;
  155.   while ba = bsize do
  156.     if ((outtasks = 0) and HasRead) or WaitFor(t^,writesema) then begin
  157.       t^.Poisoned := true;
  158.       Put := true;
  159.       exit
  160.     end;
  161.   w := head;
  162.   buf^[head] := ch; inc(head); if head >= bsize then head := 0; inc(ba);
  163.   if ba = 1 then Release(readsema) { Buffer war leer }
  164. end;
  165.  
  166. function tPipe.PutBin(const m; count : word) : boolean;
  167. var
  168.   w : word;
  169.   a : ca absolute m;
  170. begin
  171.   PutBin := false;
  172.   w := 0;
  173.   while w < count do begin
  174.     if Put(a[w]) then begin
  175.       PutBin := true;
  176.       exit
  177.     end;
  178.     inc(w)
  179.   end
  180. end;
  181.  
  182. function tPipe.PutS(const s : string) : boolean;
  183. var i : byte;
  184. begin
  185.   PutS := true;
  186.   for i := 0 to length(s) do
  187.     if Put(s[i]) then exit;
  188.   PutS := false
  189. end;
  190.  
  191. function tPipe.WriteLn(const s : string) : boolean;
  192. begin
  193.   WriteLn := PutBin(s[1],length(s)) or Put(#13) or Put(#10)
  194. end;
  195.  
  196. function tPipe.WaitUntilEmpty : boolean;
  197. begin
  198.   WaitUntilEmpty := false;
  199.   repeat
  200.     if Switch then begin
  201.       WaitUntilEmpty := true;
  202.       exit
  203.     end
  204.   until head = tail
  205. end;
  206.  
  207. function tPipe.Peek(var ch : char) : boolean;
  208. begin
  209.   if ba = 0 then
  210.     Peek := false
  211.   else begin
  212.     ch := buf^[tail];
  213.     Peek := true
  214.   end
  215. end;
  216.  
  217. function tPipe.Get : char;
  218. begin
  219.   while ba = 0 do  { Buffer leer }
  220.     if (haswritten and (intasks = 0)) or WaitFor(t^,readsema) then begin
  221.       t^.Poisoned := true;
  222.       exit
  223.     end;
  224.   Get := buf^[tail];
  225.   inc(tail); if tail >= bsize then tail := 0;
  226.   dec(ba);
  227.   if ba = bsize-1 then Release(writesema)
  228. end;
  229.  
  230. function tPipe.GetBin(var m; count : word) : boolean;
  231. var
  232.   w : word;
  233.   a : ca absolute m;
  234. begin
  235.   GetBin := false;
  236.   w := 0;
  237.   while w < count do begin
  238.     a[w] := Get;
  239.     if t^.Poisoned then begin
  240.       GetBin := true;
  241.       exit
  242.     end;
  243.     inc(w)
  244.   end
  245. end;
  246.  
  247. function tPipe.GetS(var s : string) : boolean;
  248. var i : byte;
  249. begin
  250.   GetS := true;
  251.   s[0] := Get; if t^.Poisoned then exit;
  252.   for i := 1 to length(s) do begin
  253.     s[i] := Get; if t^.Poisoned then exit;
  254.   end;
  255.   GetS := false
  256. end;
  257.  
  258. function tPipe.ReadLn(var s : string) : boolean;
  259. begin
  260.   ReadLn := true;
  261.   s := '';
  262.   repeat
  263.     s[length(s)+1] := Get;
  264.     if t^.Poisoned then exit;
  265.     if s[length(s)+1] = #13 then break else inc(s[0]);
  266.   until false;
  267.   if s[1] = #10 then delete(s,1,1);
  268.   ReadLn := false
  269. end;
  270.  
  271. end.
  272.